#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <netinet/ip.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <pthread.h>
#include <signal.h>
#include <malloc.h>
#include <errno.h>
#include <sys/resource.h>
#include <sys/prctl.h>
#include <sys/file.h>
#include <sys/epoll.h>
#include <linux/vm_sockets.h>

#define log_switch 0
#include "req.h"
#include "log.h"
#include "libsocket.h"
#include "qtfs_fifo.h"


/* 
	总体架构
	主线程：epoll所有fifo的读或写，
	open线程：被epoll线程创建，阻塞打开fifo，打开后将fd加入epoll监听列表后退出。

	关于线程资源回收：open线程拉起时被设置为PTHREAD_CREATE_DETACHED属性，不进行主动回收。

	open线程正常打开fd后，自己退出；还未打开fd，主线程收到对端关闭管道消息时会直接调用pthread_cancel杀死。
		此处有可能有低概率并发资源泄漏问题：open线程刚好打开了fd，还没加入主线程时被主线程杀死。
*/

static int epollfd = -1;
static int sockfamily = AF_VSOCK; // 默认vsock
static pthread_mutex_t fifomutex = PTHREAD_MUTEX_INITIALIZER;
#define EPOLL_MAX_EVENT_NUMS 64

enum {
	FIFO_RET_OK,
	FIFO_RET_ERR,
	FIFO_RET_DEL, // only delete myself
	FIFO_RET_DEL_BOTH, // delete myself and peer
	FIFO_RET_SUSPEND, // 将此fd的事件挂起，不删除fd，只从epoll中去掉监听
};

enum {
	FIFO_INV, // 初始无效
	FIFO_READ,
	FIFO_WRITE,
	FIFO_BLOCK,
	FIFO_NONBLOCK,
};

enum {
	FIFO_PEER_PRE,
	FIFO_PEER_ADD,
	FIFO_PEER_POST,
};
// 主线程epoll关键数据结构
struct fifo_event_t {
	int fd;
	struct fifo_event_t *peerevt;

	/* 触发时的操作函数 */
	int (*handler)(struct fifo_event_t *event);
	// 仅在open阻塞状态有效，open完成后应该置空
	void *priv;
	int len; // valid read or write len
	int peerfd; // priv fd
	unsigned long seq_num;
	int block; // block fifo or nonblock
};

struct open_arg_t {
	struct qtreq_fifo_open *req;
	/* 此fifo对应在epoll主线程中的event结构，
	用于open成功后将fd加入main_event */
	struct fifo_event_t *main_evt;
	/* open与主线程有竞争资源，因为open
	是临时的少量线程，放在open结构里少占用资源，
	只有在open线程中需要用锁，以及epoll线程中
	在open状态的fd需要用锁，epoll线程非open状态
	不需要加锁 */
	pthread_mutex_t mutex;
	pthread_t *t;
};

static int fifo_rw_flags(unsigned int flags)
{
	if (flags & O_WRONLY)
		return FIFO_WRITE;
	return FIFO_READ;
}

static int fifo_block_flags(unsigned int flags)
{
	if (flags & O_NONBLOCK)
		return FIFO_NONBLOCK;
	return FIFO_BLOCK;
}

static int fifo_recv_with_timeout(int fd, char *msg, int len)
{
#define TMOUT_BLOCK_SIZE 1024
#define TMOUT_UNIT_MS 20
#define TMOUT_INTERVAL 1
#define TMOUT_MAX_MS 1000
	int total_recv = 0;
	int ret;
	int tmout_ms = ((len / TMOUT_BLOCK_SIZE) + 1) * TMOUT_UNIT_MS;
	if (len <= 0 || msg == NULL || fd < 0) {
		log_err("invalid param fd:%d len:%d or %s", fd, len, (msg == NULL) ? "msg is NULL" : "msg is not NULL");
		return 0;
	}
	if (tmout_ms > TMOUT_MAX_MS)
		tmout_ms = TMOUT_MAX_MS;
	do {
		ret = recv(fd, &msg[total_recv], len - total_recv, 0);
		if (ret < 0) {
			log_err("recv failed ret:%d errno:%d", ret, errno);
			return ret;
		}
		total_recv += ret;
		if (total_recv > len) {
			log_err("fatal error total recv:%d longger than target len:%d", total_recv, len);
			return 0;
		}
		if (total_recv == len) {
			return total_recv;
		}
		usleep(TMOUT_INTERVAL * 1000);
		tmout_ms -= TMOUT_INTERVAL;
	} while (tmout_ms > 0);
	log_err("Fatal error, the target recv len:%d and only %d length is received when it time out", len, total_recv);
	return 0;
}

struct fifo_event_t *fifo_add_event(int fd, struct fifo_event_t *peerevt, int (*handler)(struct fifo_event_t *), void *priv, unsigned int events)
{
	struct epoll_event evt;
	struct fifo_event_t *fifoevt = (struct fifo_event_t *)malloc(sizeof(struct fifo_event_t));
	if (fifoevt == NULL) {
		log_err("failed to malloc event, fd:%d peer:%d errno:%d", fd, peerevt->fd, errno);
		return NULL;
	}
	memset(fifoevt, 0, sizeof(struct fifo_event_t));
	fifoevt->fd = fd;
	fifoevt->peerevt = peerevt;
	fifoevt->handler = handler;
	fifoevt->priv = priv;
	evt.data.ptr = (void *)fifoevt;
	evt.events = events;
	if (-1 == epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &evt)) {
		log_err("epoll add fd:%d peer:%d failed, errno:%d", fd, peerevt->fd, errno);
		free(fifoevt);
		return NULL;
	}
	return fifoevt;
}

void fifo_del_event(struct fifo_event_t *evt)
{
	// close fd, 内核会回收epoll资源
	close(evt->fd);
	free(evt);
	return;
}

int fifo_resume_event(struct fifo_event_t *evt, unsigned int events)
{
	struct epoll_event event;
	event.data.ptr = (void *)evt;
	event.events = events;
	if (-1 == epoll_ctl(epollfd, EPOLL_CTL_ADD, evt->fd, &event)) {
		log_err("epoll ctl add fd:%d event failed, errno:%d.", evt->fd, errno);
		return -1;
	}
	return 0;
}

void fifo_suspend_event(struct fifo_event_t *evt)
{
	struct epoll_event event;
	event.data.ptr = (void *)evt;
	if (epoll_ctl(epollfd, EPOLL_CTL_DEL, evt->fd, &event) == -1) {
		log_err("suspend event fd:%d failed, errno:%d", evt->fd, errno);
	}
	return;
}

static int fifo_peer_index;
static struct fifo_event_t *fifo_peer_evt[EPOLL_MAX_EVENT_NUMS];
static int fifo_del_peer(int flag, struct fifo_event_t *me)
{
	// 自己已经先被peer加过了就不必加peer了，peer此时已释放
	for (int i = 0; i < fifo_peer_index; i++) {
		if (fifo_peer_evt[i] == me)
			return 0;
	}
	switch (flag) {
		case FIFO_PEER_PRE:
			fifo_peer_index = 0;
			memset(fifo_peer_evt, 0, sizeof(struct fifo_event_t *) * EPOLL_MAX_EVENT_NUMS);
			break;
		case FIFO_PEER_ADD:
			fifo_peer_evt[fifo_peer_index++] = me->peerevt;
			break;
		case FIFO_PEER_POST:
			for (int i = 0; i < fifo_peer_index; i++) {
				fifo_del_event(fifo_peer_evt[i]);
			}
			break;
		default:
			log_err("invalid flag:%d", flag);
			break;
	}
	return 0;
}

int fifo_mod_event(struct fifo_event_t *evt, unsigned int events)
{
	struct epoll_event event;
	event.data.ptr = (void *)evt;
	event.events = events;
	if (-1 == epoll_ctl(epollfd, EPOLL_CTL_MOD, evt->fd, &event)) {
		log_err("modify event fd:%d failed, errno:%d", evt->fd, errno);
		return -1;
	}
	return 0;
}

static void fifo_proc_ack(struct fifo_event_t *evt, int type, int sockfd, char *arg, int arglen)
{
	int ret;
	struct qtreq rsp;
	char *msg = (char *)malloc(sizeof(rsp) + arglen);
	if (msg == NULL) {
		log_err("malloc failed:%d.", errno);
		return;
	}

	rsp.type = type;
	rsp.err = 0;
	rsp.seq_num = evt->seq_num;
	rsp.len = arglen;

	memcpy(msg, &rsp, sizeof(rsp));
	memcpy(&msg[sizeof(rsp)], arg, arglen);

	ret = write(sockfd, msg, sizeof(struct qtreq) + arglen);
	free(msg);
	if (ret < 0) {
		log_err("fifo ack type:%d failed, sockfd:%d err:%d", type, sockfd, errno);
		return;
	}
	log_info("Type:%d ack successed, sockfd:%d.", type, sockfd);
	return;
}


int fifo_proc_unknown(struct fifo_event_t *evt)
{
	struct open_arg_t *oarg = (struct open_arg_t *)evt->priv;
	log_info("unknown read/write event fd:%d happend, open event not complete!", evt->fd);
	// 这不是预期的事件，直接删除此事件，且关联删除open线程
	pthread_mutex_lock(&fifomutex);
	// 如果priv为空表示open thread已退出
	if (evt->priv) {
		// 如果priv非空，则open thread还在阻塞状态，先杀死线程，然后释放资源，置空priv。
		pthread_cancel(*oarg->t);
		oarg = (struct open_arg_t *)evt->priv;
		free(oarg->t);
		free(oarg->req);
		free(oarg);
		evt->priv = NULL;
	}
	pthread_mutex_unlock(&fifomutex);
	return FIFO_RET_DEL;
}

// reverse是处理异常事件，正常情况下不会接受反向事件，
// 如果有反向事件则说明是断连事件
int fifo_proc_reverse(struct fifo_event_t *evt)
{
	log_info("reverse event happend.");
	return FIFO_RET_OK;
}

// 当读请求发生时，有可能阻塞，此时将fifo端加入监听，等到可读时
// 再触发处理，不在主线程中阻塞读
int fifo_proc_readable(struct fifo_event_t *evt)
{
	// 读完立即将自己置为EPOLLHUP，不连续读取
	int ret;
	char *msg;
	struct qtrsp_fifo_read *rsp;
	int readlen = evt->len;
	int error_ret = FIFO_RET_SUSPEND;
	if (readlen > QTFS_REQ_MAX_LEN) {
		log_err("Read rsp len:%d too large!", readlen);
		ret = EINVAL;
		goto err_ack;
	}

	msg = (char *)malloc(readlen + sizeof(struct qtrsp_fifo_read));
	if (msg == NULL) {
		log_err("malloc memory failed, errno:%d", errno);
		ret = ENOMEM;
		goto err_ack;
	}

	rsp = (struct qtrsp_fifo_read *)msg;
	ret = read(evt->fd, &msg[sizeof(struct qtrsp_fifo_read)], readlen);
	if (ret <= 0) {
		log_err("read from fifo:%d failed, readlen:%d, errno:%d", evt->fd, readlen, errno);
		ret = errno;
		free(msg);
		if (errno == EPIPE)
			error_ret = FIFO_RET_DEL_BOTH;
		goto err_ack;
	}
	rsp->err = 0;
	rsp->len = ret;
	fifo_proc_ack(evt, QTFS_REQ_READITER, evt->peerevt->fd, msg, ret + sizeof(struct qtrsp_fifo_read));

	log_info("readable event fd:%d peerfd:%d, errno:%d", evt->fd, evt->peerevt->fd, errno);
	free(msg);
	// 挂起readable任务，恢复监听网络请求
	if (fifo_resume_event(evt->peerevt, EPOLLIN|EPOLLHUP) == -1) {
		goto err_ack;
	}
	// 读完立即删除本监听，如果继续读后面再添加进来
	return FIFO_RET_SUSPEND;

err_ack:
	do {
		struct qtrsp_fifo_read errrsp;
		errrsp.err = ret;
		errrsp.len = 0;
		fifo_proc_ack(evt, QTFS_REQ_READITER, evt->peerevt->fd, (char *)&errrsp, sizeof(errrsp));
	} while (0);
	if (fifo_resume_event(evt->peerevt, EPOLLIN|EPOLLHUP) == -1)
		return FIFO_RET_DEL_BOTH;
	return error_ret;
}

int fifo_proc_writeable(struct fifo_event_t *evt)
{
	// 写完立即将自己置为EPOLLHUP，不连续写
	int ret;
	char *msg;
	struct qtrsp_fifo_write rsp;
	int writelen = evt->len;
	int error_ret = FIFO_RET_SUSPEND;
	if (writelen > QTFS_REQ_MAX_LEN) {
		log_err("Read rsp len:%d too large!", writelen);
		ret = EINVAL;
		goto err_ack;
	}
	msg = (char *)malloc(writelen + sizeof(struct qtrsp_fifo_write));
	if (msg == NULL) {
		log_err("malloc memory failed, errno:%d", errno);
		ret = errno;
		goto err_ack;
	}
	ret = fifo_recv_with_timeout(evt->peerevt->fd, msg, evt->len);
	if (ret <= 0) {
		log_err("recv socket write failed, fd:%d peer:%d, errno:%d.", evt->peerevt->fd, evt->fd, errno);
		// 主线程是串行的，peerevt如果是空，则没有readable监听，直接close peerfd即可
		ret = errno;
		free(msg);
		goto err_ack;
	}
	ret = write(evt->fd, msg, ret);
	if (ret <= 0) {
		log_err("write to fifo failed, ret:%d errno:%d", ret, errno);
		ret = errno;
		free(msg);
		if (errno == EPIPE)
			error_ret = FIFO_RET_DEL_BOTH;
		goto err_ack;
	}
	rsp.err = 0;
	rsp.len = ret;
	fifo_proc_ack(evt, QTFS_REQ_WRITE, evt->peerevt->fd, (char *)&rsp, sizeof(struct qtrsp_fifo_write));

	log_info("writeable event fd:%d peerfd:%d, writelen:%lu, errno:%d", evt->fd, evt->peerevt->fd, rsp.len, errno);
	free(msg);
	// 挂起写fifo任务，重启监听网络请求任务
	if (fifo_resume_event(evt->peerevt, EPOLLIN) == -1) {
		goto err_ack;
	}
	return FIFO_RET_SUSPEND;

err_ack:
	do {
		struct qtrsp_fifo_write errrsp;
		errrsp.err = ret;
		errrsp.len = 0;
		fifo_proc_ack(evt, QTFS_REQ_WRITE, evt->peerevt->fd, (char *)&errrsp, sizeof(errrsp));
	} while (0);
	if (fifo_resume_event(evt->peerevt, EPOLLIN|EPOLLHUP) == -1)
		return FIFO_RET_DEL_BOTH;
	return error_ret;
}

// 处理读请求，读可能阻塞，因为打开时已经确定是否阻塞型，
// 这里直接将peer改成监听状态去等待触发
int fifo_proc_read_req(struct fifo_event_t *evt)
{
	struct qtreq_fifo_read req;
	int ret;
	ret = fifo_recv_with_timeout(evt->fd, (char *)&req, sizeof(req));
	if (ret <= 0) {
		log_err("recv fifo read head failed, errno:%d.", errno);
		// 链接提前被中断了，有可能还没有peerevt，就直接关掉fd
		if (evt->peerevt == NULL) {
			if (evt->peerfd != 0)
				close(evt->peerfd);
			return FIFO_RET_DEL;
		}
		// 如果peerevt非空则要同时删除peer事件
		return FIFO_RET_DEL_BOTH;
	}
	log_info("recv read req len:%d", req.len);
	if (evt->block == FIFO_NONBLOCK) {
		struct fifo_event_t rd;
		rd.fd = evt->peerfd;
		rd.peerevt = evt;
		rd.len = req.len;
		rd.seq_num = evt->seq_num;
		fifo_proc_readable(&rd);
		return FIFO_RET_OK;
	}

	// if fifo is block, dont block on main thread
	if (evt->peerevt == NULL) {
		struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_readable, NULL, EPOLLIN);
		if (newevt == NULL) {
			log_err("add readable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd);
			goto early_ack;
		}
		evt->peerevt = newevt;
		newevt->len = req.len;
		newevt->seq_num = evt->seq_num;
	} else {
		evt->peerevt->seq_num = evt->seq_num;
		evt->peerevt->len = req.len;
		if (fifo_resume_event(evt->peerevt, EPOLLIN) == -1)
			goto early_ack;
	}
	return FIFO_RET_SUSPEND;

early_ack:
	do {
		struct qtrsp_fifo_read rsp;
		rsp.err = -EFAULT;
		rsp.len = 0;
		fifo_proc_ack(evt, QTFS_REQ_READITER, evt->fd, (char *)&rsp, sizeof(rsp));
	} while (0);
	return FIFO_RET_DEL_BOTH;
}

// 写
int fifo_proc_write_req(struct fifo_event_t *evt)
{
	struct qtreq_fifo_write req;
	int ret;
	ret = fifo_recv_with_timeout(evt->fd, (char *)&req, sizeof(req));
	if (ret <= 0) {
		log_err("recv fifo write head failed, errno:%d.", errno);
		// 链接提前被中断了，有可能还没有peerevt，就直接关掉fd
		if (evt->peerevt == NULL) {
			if (evt->peerfd != 0)
				close(evt->peerfd);
			return FIFO_RET_DEL;
		}
		// 如果peerevt非空则要同时删除peer事件
		return FIFO_RET_DEL_BOTH;
	}
	log_info("recv write req len:%d", req.len);
	if (evt->block == FIFO_NONBLOCK) {
		struct fifo_event_t wr;
		wr.fd = evt->peerfd;
		wr.peerevt = evt;
		wr.len = req.len;
		wr.seq_num = evt->seq_num;
		fifo_proc_writeable(&wr);
		return FIFO_RET_OK;
	}
	// if fifo is block, dont block on main thread
	if (evt->peerevt == NULL) {
		struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_writeable, NULL, EPOLLOUT);
		if (newevt == NULL) {
			log_err("add writeable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd);
			goto early_ack;
		}
		newevt->len = req.len;
		newevt->seq_num = evt->seq_num;
		evt->peerevt = newevt;
	} else {
		evt->peerevt->seq_num = evt->seq_num;
		evt->peerevt->len = req.len;
		if (fifo_resume_event(evt->peerevt, EPOLLOUT) == -1) {
			goto early_ack;
		}
	}

	// 此时tcp fd需要切换为writeable状态，不能同时用，将本任务挂起不再监听，writeable完事再切回来
	return FIFO_RET_SUSPEND;
early_ack:
	do {
		struct qtrsp_fifo_write rsp;
		rsp.err = -EFAULT;
		rsp.len = 0;
		fifo_proc_ack(evt, QTFS_REQ_WRITE, evt->fd, (char *)&rsp, sizeof(rsp));
	} while (0);
	return FIFO_RET_DEL_BOTH;

}

// read/write/close req
int fifo_proc_new_req(struct fifo_event_t *evt)
{
	struct qtreq head;
	int ret;
	ret = fifo_recv_with_timeout(evt->fd, (char *)&head, sizeof(struct qtreq));
	if (ret <= 0) {
		log_err("recv qtreq head failed, errno:%d.", errno);
		// 如果peerevt非空则要同时删除peer事件
		return FIFO_RET_DEL_BOTH;
	}
	switch (head.type) {
		case QTFS_REQ_CLOSE:
			log_info("recv close req, close tcp fd:%d fifofd:%d", evt->fd, evt->peerfd);
			evt->seq_num = head.seq_num;
			fifo_proc_ack(evt, QTFS_REQ_CLOSE, evt->fd, NULL, 0);
			if (evt->peerevt == NULL) {
				close(evt->peerfd);
				return FIFO_RET_DEL;
			}
			// 如果peerevt非空则要同时删除peer事件
			return FIFO_RET_DEL_BOTH;

		case QTFS_REQ_READITER:
			log_info("recv readiter req, fd:%d peerfd:%d", evt->fd, evt->peerfd);
			evt->seq_num = head.seq_num;
			return fifo_proc_read_req(evt);

		case QTFS_REQ_WRITE:
			log_info("recv write req, fd:%d peerfd:%d", evt->fd, evt->peerfd);
			evt->seq_num = head.seq_num;
			return fifo_proc_write_req(evt);

		default:
			log_info("recv invalid req:%u fd:%d peerfd:%d", head.type, evt->fd, evt->peerfd);
			break;
	}

	return FIFO_RET_ERR;
}

void *fifo_open_thread(void *arg)
{
	int fd;
	struct fifo_event_t *evt = (struct fifo_event_t *)arg;
	struct open_arg_t *oarg = (struct open_arg_t *)evt->priv;
	int rw;
	int err = 0;
	struct fifo_event_t *newevt;
	struct qtrsp_fifo_open rsp = {.err = 0};

	fd = open(oarg->req->path, oarg->req->flags, oarg->req->mode);
	if (fd < 0) {
		log_err("open fifo:%s failed, fd:%d errno:%d", oarg->req->path, fd, errno);
		goto err_end;
	}
	rw = fifo_rw_flags(oarg->req->flags);
	log_info("Recv open request fifo:%s flags:%x mode:%x rw:%d", oarg->req->path, oarg->req->flags, oarg->req->mode, rw);

	// read和write，代表的是向server端fifofd的操作方向，在初始状态，本
	// 代理不应该主动，只监听挂断事件，在通信对端发来read/write消息才
	// 改为监听可读/可写状态并进行实际读写。
	pthread_mutex_lock(&fifomutex);
	if (evt->priv == NULL) {
		log_err("fatal error, oarg is invalid.");
		goto end;
	}
	oarg->main_evt->peerevt = NULL;
	oarg->main_evt->handler = fifo_proc_new_req;
	oarg->main_evt->peerfd = fd;
	oarg->main_evt->block = fifo_block_flags(oarg->req->flags);

	rsp.fd = fd;
	// 按理说每个fifo的链接只有自己串行使用，不需要考虑两个线程竞争
	fifo_proc_ack(oarg->main_evt, QTFS_REQ_OPEN, oarg->main_evt->fd, (char *)&rsp, sizeof(rsp));

	goto end;

err_end:
	rsp.err = errno;
	fifo_proc_ack(oarg->main_evt, QTFS_REQ_OPEN, oarg->main_evt->fd, (char *)&rsp, sizeof(rsp));
	
end:
	free(oarg->t);
	free(oarg->req);
	free(oarg);
	evt->priv = NULL;
	pthread_mutex_unlock(&fifomutex);
	return NULL;
}

int fifo_proc_open_req(struct fifo_event_t *evt)
{
	struct open_arg_t *oarg;
	struct qtreq_fifo_open *req;
	struct qtreq head;
	pthread_t *t;
	pthread_attr_t attr;
	int ret;
	ret = fifo_recv_with_timeout(evt->fd, (char *)&head, sizeof(head));
	if (ret <= 0) {
		log_err("recv open head failed.");
		return FIFO_RET_ERR;
	}
	log_info("recv head type:%u seq:%lu len:%d", head.type, head.seq_num, head.len);
	if (head.len > sizeof(struct qtreq_fifo_open)) {
		log_err("open head len:%d is too big", head.len);
		return FIFO_RET_ERR;
	}
	// 按需申请path长度
	req = (struct qtreq_fifo_open *)malloc(head.len + 1);
	if (req == NULL) {
		// todo: 既然没有成功，要清理掉后面的消息体
		log_err("alloc memory failed, errno:%d", errno);
		return FIFO_RET_ERR;
	}
	oarg = (struct open_arg_t *)malloc(sizeof(struct open_arg_t));
	if (oarg == NULL) {
		log_err("alloc open arg memory failed, errno:%d", errno);
		free(req);
		return FIFO_RET_ERR;
	}
	memset(req, 0, head.len + 1);
	ret = fifo_recv_with_timeout(evt->fd, (char *)req, head.len);
	if (ret <= 0) {
		log_err("recv req failed.");
		free(req);
		free(oarg);
		return FIFO_RET_ERR;
	}
	pthread_mutex_init(&oarg->mutex, NULL);
	oarg->main_evt = evt;
	oarg->req = req;
	evt->seq_num = head.seq_num;

	// create new open thread
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

	t = (pthread_t *)malloc(sizeof(pthread_t));
	if (t == NULL) {
		log_err("alloc memory failed, errno:%d", errno);
		free(req);
		free(oarg);
		return FIFO_RET_ERR;
	}
	evt->priv = oarg;
	oarg->t = t;
	// 临时状态机，暂时不知道是读是写
	evt->handler = fifo_proc_unknown;
	pthread_create(t, &attr, fifo_open_thread, evt);

	log_info("Start new fifo open thread head:%u, len:%d", head.type, head.len);
	return FIFO_RET_OK;
}

// mainsock 仅处理新连接建联，添加事件，
// 单线程不能阻塞等消息，必须尽快让出线程使用权
int fifo_proc_main_sock(struct fifo_event_t *evt)
{
	int ret;
	struct qtreq headreq;
	int connfd = libsock_accept(evt->fd, sockfamily);
	if (connfd < 0) {
		log_err("accept new connection failed, ret:%d errno:%d", connfd, errno);
		return FIFO_RET_ERR;
	}
	// 新建联肯定是open请求了
	if (fifo_add_event(connfd, NULL, fifo_proc_open_req, NULL, EPOLLIN|EPOLLHUP) < 0) {
		log_err("add new connection event failed.");
		return FIFO_RET_ERR;
	}

	return FIFO_RET_OK;
}


extern int engine_run;
void *fifo_server_main_thread(void *arg)
{
	int indx = 0;

	int sockfd;
	struct fifo_server_arg_t *parg = (struct fifo_server_arg_t *)arg;
	struct epoll_event *evts = NULL;

	// init socket server
	if (parg->family == AF_INET) {
		sockfd = libsock_build_inet_connection(parg->addr, parg->port, LIBSOCK_SERVER);
	} else {
		sockfd = libsock_build_vsock_connection(parg->cid, parg->port, LIBSOCK_SERVER);
	}
	if (sockfd < 0) {
		log_err("fifo server main thread start failed, please check input argument!");
		return NULL;
	}
	sockfamily = parg->family;

	// create epoll
	epollfd = epoll_create1(0);
	if (epollfd == -1) {
		log_err("fifo server epoll create failed, errno:%d", errno);
		goto epoll_create_err;
	}
	evts = calloc(EPOLL_MAX_EVENT_NUMS, sizeof(struct fifo_event_t));
	if (evts == NULL) {
		log_err("fifo server calloc events failed, errno:%d", errno);
		goto evts_calloc_err;
	}

	fifo_add_event(sockfd, NULL, fifo_proc_main_sock, NULL, EPOLLIN);

	while (engine_run) {
		int ret;
		struct fifo_event_t *event;
		int n = epoll_wait(epollfd, evts, EPOLL_MAX_EVENT_NUMS, 1000);
		if (n == 0)
			continue;
		if (n < 0) {
			log_err("epoll wait err:%d, errno:%d", n, errno);
			continue;
		}
		fifo_del_peer(FIFO_PEER_PRE, NULL);
		for (int i = 0; i < n; i ++) {
			event = (struct fifo_event_t *)evts[i].data.ptr;
			log_info("new event fd:%d events:0x%x", event->fd, evts[i].events);
			ret = event->handler(event);
			if (ret == FIFO_RET_SUSPEND) {
				fifo_suspend_event(event);
			} else if (ret == FIFO_RET_DEL) {
				fifo_del_event(event);
			} else if (ret == FIFO_RET_DEL_BOTH) {
				fifo_del_peer(FIFO_PEER_ADD, event);
				fifo_del_event(event);
			}
		}
		fifo_del_peer(FIFO_PEER_POST, NULL);
	}
	
	return NULL;

evts_calloc_err:
	close(epollfd);
	epollfd = -1;
epoll_create_err:
	close(sockfd);
	log_err("fifo server error exit.");
	return NULL;
}
